-
Notifications
You must be signed in to change notification settings - Fork 275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add NettyByteBuf to GetBlobOperation #1314
Conversation
1994da0
to
bbe3cc4
Compare
Codecov Report
@@ Coverage Diff @@
## master #1314 +/- ##
============================================
- Coverage 72.04% 72.01% -0.04%
- Complexity 6699 6714 +15
============================================
Files 486 486
Lines 38163 38207 +44
Branches 4843 4843
============================================
+ Hits 27495 27513 +18
- Misses 9352 9382 +30
+ Partials 1316 1312 -4
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good. A few comments.
@@ -1197,18 +1190,15 @@ void handleBody(ResponseInfo responseInfo, InputStream payload, MessageMetadata | |||
} | |||
} | |||
blobType = blobData.getBlobType(); | |||
chunkIndexToBuffer = new TreeMap<>(); | |||
chunkIndexToResponseInfo = new ConcurrentHashMap<>(); | |||
chunkIndexToBuf = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be ConcurrentHashMap
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will remove ByteBuf from this hash map in other threads, so at least it has to be thread safe.
ByteBuffer chunkBuffer = blobData.getStream().getByteBuffer(); | ||
responseInfo.retain(); | ||
chunkIndexToResponseInfo.put(chunkIndex, responseInfo); | ||
ByteBuf chunkBuffer = blobData.getAndRelease(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's call it chunkBuf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
@@ -838,7 +836,7 @@ void handleResponse(ResponseInfo responseInfo, GetResponse getResponse) { | |||
* @param targetBlobId the {@link BlobId} of the blob. | |||
* @return {@code true} if a crypto job was launched, otherwise {@code false}. | |||
*/ | |||
protected boolean maybeLaunchCryptoJob(ByteBuffer dataBuffer, byte[] userMetadata, ByteBuffer encryptionKey, | |||
protected boolean maybeLaunchCryptoJob(ByteBuf dataBuffer, byte[] userMetadata, ByteBuffer encryptionKey, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about making variable naming consistent, if its type is ByteBuf, we call it **Buf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
@@ -857,7 +855,7 @@ protected boolean maybeLaunchCryptoJob(ByteBuffer dataBuffer, byte[] userMetadat | |||
decryptCallbackResultInfo = new DecryptCallBackResultInfo(); | |||
progressTracker.initializeCryptoJobTracker(CryptoJobType.DECRYPTION); | |||
decryptJobMetricsTracker.onJobSubmission(); | |||
cryptoJobHandler.submitJob(new DecryptJob(targetBlobId, encryptionKey, dataBuffer, | |||
cryptoJobHandler.submitJob(new DecryptJob(targetBlobId, encryptionKey, dataBuffer.retainedDuplicate(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see the dataBuffer
is released in this method. (Correct me if I misunderstood)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dataBuffer is not created in this function, so we don't release it here. But it will be released in the DecryptJob.
return stream; | ||
} | ||
|
||
/** | ||
* Return the netty {@link ByteBuf} and then transfer the ownship to the caller. It's not safe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: ownership. And why it's not safe to call this more than once? You have byteBuf == null
check here. Caller is supposed to check result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is not an reentrant method, calling this method multiple time would get you multiple result so it's better not to call it twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ownship -> ownership
return null; | ||
} | ||
ByteBuffer temp = ByteBuffer.allocate(byteBuf.readableBytes()); | ||
byteBuf.writeBytes(temp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why writeBytes here? I feel like it's supposed to be readBytes from byteBuf to temp, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right, updated.
ambry-replication/src/main/java/com.github.ambry.replication/BlobIdTransformer.java
Show resolved
Hide resolved
} | ||
output.flip(); | ||
return output; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like this method can be replaced by ByteBufferInputStream
and call getByteBuffer()
to obtain the byteBuffer. At least we can consider reusing the code in the ctor of ByteBufferInputStream
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
06f5bc5
to
3d31576
Compare
...messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java
Outdated
Show resolved
Hide resolved
...essageformat/src/test/java/com.github.ambry.messageformat/MessageSievingInputStreamTest.java
Outdated
Show resolved
Hide resolved
/** | ||
* Decrease the reference count of underlying response. | ||
*/ | ||
public void release() { | ||
if (response != null) { | ||
ReferenceCountUtil.release(response); | ||
response = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this only be set to null when the refcount reaches zero?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now that we remove the retain function, we can be sure that we only need to release it once.
ambry-messageformat/src/main/java/com.github.ambry.messageformat/MessageFormatRecord.java
Outdated
Show resolved
Hide resolved
3d31576
to
4437544
Compare
4437544
to
5028916
Compare
97cdcf9
to
52b959f
Compare
52b959f
to
bb98acb
Compare
bb98acb
to
0571c0a
Compare
0571c0a
to
54f9606
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can merge after these comments are addressed
return stream; | ||
} | ||
|
||
/** | ||
* Return the netty {@link ByteBuf} and then transfer the ownship to the caller. It's not safe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ownship -> ownership
@@ -73,14 +74,18 @@ public void run() { | |||
Object containerKey = kms.getKey(blobId.getAccountId(), blobId.getContainerId()); | |||
Object perBlobKey = cryptoService.decryptKey(encryptedPerBlobKey, containerKey); | |||
if (encryptedBlobContent != null) { | |||
decryptedBlobContent = cryptoService.decrypt(encryptedBlobContent, perBlobKey); | |||
decryptedBlobContent = cryptoService.decrypt(encryptedBlobContent.nioBuffer(), perBlobKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use the CryptoService.decrypt(ByteBuf, Key)
method here? I think the default method has some special handling for edge cases with composite buffers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
planning on call the other method in a different PR, where I will change bytebuffer in DecryptJob and EncryptJob to ByteBuf.
This starts to use Netty ByteBuf in GetBlobOperation.
This PR seeks the least intrusive way to introduce Netty ByteBuf to GetBlobOperation, so it ignores a lot of places where the ByteBuffer can be replaced by Netty ByteBuf.
The major change is to change the buffer/inputstream in BlobData to use Netty ByteBuf.